// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied.  See the License for the specific language governing permissions and limitations
// under the License.
//
#ifndef YB_TSERVER_TS_TABLET_MANAGER_H
#define YB_TSERVER_TS_TABLET_MANAGER_H

#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include <boost/optional/optional_fwd.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <gtest/gtest_prod.h>

#include "yb/rocksdb/cache.h"
#include "yb/rocksdb/options.h"
#include "yb/client/async_initializer.h"
#include "yb/client/client_fwd.h"
#include "yb/consensus/consensus.h"
#include "yb/consensus/metadata.pb.h"
#include "yb/gutil/macros.h"
#include "yb/gutil/ref_counted.h"
#include "yb/tablet/tablet_fwd.h"
#include "yb/tserver/tablet_peer_lookup.h"
#include "yb/tserver/tserver.pb.h"
#include "yb/tserver/tserver_admin.pb.h"
#include "yb/util/locks.h"
#include "yb/util/metrics.h"
#include "yb/util/status.h"
#include "yb/util/threadpool.h"
#include "yb/tablet/tablet_options.h"

namespace yb {

class PartitionSchema;
class FsManager;
class HostPort;
class Partition;
class Schema;
class BackgroundTask;

namespace consensus {
class RaftConfigPB;
} // namespace consensus

namespace master {
class ReportedTabletPB;
class TabletReportPB;
} // namespace master

namespace tserver {
class TabletServer;
class TSMemoryMonitorListener;

using rocksdb::MemoryMonitor;

// Map of tablet id -> transition reason string.
typedef std::unordered_map<std::string, std::string> TransitionInProgressMap;

class TransitionInProgressDeleter;

// If 'expr' fails, log a message, tombstone the given tablet, and return the
// error status.
#define TOMBSTONE_NOT_OK(expr, meta, uuid, msg, ts_manager_ptr) \
  do { \
    Status _s = (expr); \
    if (PREDICT_FALSE(!_s.ok())) { \
      LogAndTombstone((meta), (msg), (uuid), _s, ts_manager_ptr); \
      return _s; \
    } \
  } while (0)

#define SHUTDOWN_AND_TOMBSTONE_TABLET_PEER_NOT_OK(expr, tablet_peer, meta, uuid, msg, \
                                                  ts_manager_ptr) \
  do { \
    Status _s = (expr); \
    if (PREDICT_FALSE(!_s.ok())) { \
      if (tablet_peer) { \
        tablet_peer->Shutdown(); \
      } \
      LogAndTombstone((meta), (msg), (uuid), _s, ts_manager_ptr); \
      return _s; \
    } \
  } while (0)

// Keeps track of the tablets hosted on the tablet server side.
//
// TODO: will also be responsible for keeping the local metadata about
// which tablets are hosted on this server persistent on disk, as well
// as re-opening all the tablets at startup, etc.
class TSTabletManager : public tserver::TabletPeerLookupIf {
 public:
  typedef std::vector<scoped_refptr<tablet::TabletPeer>> TabletPeers;

  // Construct the tablet manager.
  // 'fs_manager' must remain valid until this object is destructed.
  TSTabletManager(FsManager* fs_manager,
                  TabletServer* server,
                  MetricRegistry* metric_registry);

  virtual ~TSTabletManager();

  // Load all tablet metadata blocks from disk, and open their respective tablets.
  // Upon return of this method all existing tablets are registered, but
  // the bootstrap is performed asynchronously.
  CHECKED_STATUS Init();

  // Waits for all the bootstraps to complete.
  // Returns Status::OK if all tablets bootstrapped successfully. If
  // the bootstrap of any tablet failed returns the failure reason for
  // the first tablet whose bootstrap failed.
  CHECKED_STATUS WaitForAllBootstrapsToFinish();

  // Shut down all of the tablets, gracefully flushing before shutdown.
  void Shutdown();

  ThreadPool* tablet_prepare_pool() const { return tablet_prepare_pool_.get(); }
  ThreadPool* raft_pool() const { return raft_pool_.get(); }
  ThreadPool* read_pool() const { return read_pool_.get(); }

  // Create a new tablet and register it with the tablet manager. The new tablet
  // is persisted on disk and opened before this method returns.
  //
  // If tablet_peer is non-NULL, the newly created tablet will be returned.
  //
  // If another tablet already exists with this ID, logs a DFATAL
  // and returns a bad Status.
  CHECKED_STATUS CreateNewTablet(
    const string &table_id,
    const string &tablet_id,
    const Partition &partition,
    const string &table_name,
    TableType table_type,
    const Schema &schema,
    const PartitionSchema &partition_schema,
    consensus::RaftConfigPB config,
    scoped_refptr<tablet::TabletPeer> *tablet_peer);

  // Delete the specified tablet.
  // 'delete_type' must be one of TABLET_DATA_DELETED or TABLET_DATA_TOMBSTONED
  // or else returns Status::IllegalArgument.
  // 'cas_config_opid_index_less_or_equal' is optionally specified to enable an
  // atomic DeleteTablet operation that only occurs if the latest committed
  // raft config change op has an opid_index equal to or less than the specified
  // value. If not, 'error_code' is set to CAS_FAILED and a non-OK Status is
  // returned.
  CHECKED_STATUS DeleteTablet(const std::string& tablet_id,
                      tablet::TabletDataState delete_type,
                      const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
                      boost::optional<TabletServerErrorPB::Code>* error_code);

  // Lookup the given tablet peer by its ID.
  // Returns true if the tablet is found successfully.
  bool LookupTablet(const std::string& tablet_id,
                    scoped_refptr<tablet::TabletPeer>* tablet_peer) const;

  // Same as LookupTablet but doesn't acquired the shared lock.
  bool LookupTabletUnlocked(const std::string& tablet_id,
                            scoped_refptr<tablet::TabletPeer>* tablet_peer) const;

  virtual CHECKED_STATUS GetTabletPeer(const std::string& tablet_id,
                               scoped_refptr<tablet::TabletPeer>* tablet_peer) const
                               override;

  virtual const NodeInstancePB& NodeInstance() const override;

  // Initiate remote bootstrap of the specified tablet.
  // See the StartRemoteBootstrap() RPC declaration in consensus.proto for details.
  // Currently this runs the entire procedure synchronously.
  // TODO: KUDU-921: Run this procedure on a background thread.
  virtual CHECKED_STATUS
      StartRemoteBootstrap(const consensus::StartRemoteBootstrapRequestPB& req) override;

  // Generate an incremental tablet report.
  //
  // This will report any tablets which have changed since the last acknowleged
  // tablet report. Once the report is successfully transferred, call
  // MarkTabletReportAcknowledged() to clear the incremental state. Otherwise, the
  // next tablet report will continue to include the same tablets until one
  // is acknowleged.
  //
  // This is thread-safe to call along with tablet modification, but not safe
  // to call from multiple threads at the same time.
  void GenerateIncrementalTabletReport(master::TabletReportPB* report);

  // Generate a full tablet report and reset any incremental state tracking.
  void GenerateFullTabletReport(master::TabletReportPB* report);

  // Mark that the master successfully received and processed the given
  // tablet report. This uses the report sequence number to "un-dirty" any
  // tablets which have not changed since the acknowledged report.
  void MarkTabletReportAcknowledged(const master::TabletReportPB& report);

  // Get all of the tablets currently hosted on this server.
  void GetTabletPeers(TabletPeers* tablet_peers) const;
  TabletPeers GetTabletPeers() const;

  // Callback used for state changes outside of the control of TsTabletManager, such as a consensus
  // role change. They are applied asynchronously internally.
  void ApplyChange(const std::string& tablet_id,
                   std::shared_ptr<consensus::StateChangeContext> context);

  // Marks tablet with 'tablet_id' dirty.
  // Used for state changes outside of the control of TsTabletManager, such as consensus role
  // changes.
  void MarkTabletDirty(const std::string& tablet_id,
                       std::shared_ptr<consensus::StateChangeContext> context);

  // Returns the number of tablets in the "dirty" map, for use by unit tests.
  int GetNumDirtyTabletsForTests() const;

  // Return the number of tablets in RUNNING or BOOTSTRAPPING state.
  int GetNumLiveTablets() const;

  CHECKED_STATUS RunAllLogGC();

  // Creates and updates the map of table to the set of tablets assigned per table per disk
  // for both data and wal directories.
  void GetAndRegisterDataAndWalDir(FsManager* fs_manager,
                                   const std::string& table_id,
                                   const std::string& tablet_id,
                                   const TableType table_type,
                                   std::string* data_root_dir,
                                   std::string* wal_root_dir);
  // Updates the map of table to the set of tablets assigned per table per disk
  // for both of the given data and wal directories.
  void RegisterDataAndWalDir(FsManager* fs_manager,
                            const std::string& table_id,
                            const std::string& tablet_id,
                            const TableType table_type,
                            const std::string& data_root_dir,
                            const std::string& wal_root_dir);
  // Removes the tablet id assigned to the table and disk pair for both the data and WAL directory
  // as pointed by the data and wal directory map.
  void UnregisterDataWalDir(const std::string& table_id,
                            const std::string& tablet_id,
                            const TableType table_type,
                            const std::string& data_root_dir,
                            const std::string& wal_root_dir);

  bool IsTabletInTransition(const std::string& tablet_id) const;

  TabletServer* server() { return server_; }

  MemoryMonitor* memory_monitor() { return tablet_options_.memory_monitor.get(); }

  // Flush some tablet if the memstore memory limit is exceeded
  void MaybeFlushTablet();

 private:
  FRIEND_TEST(TsTabletManagerTest, TestPersistBlocks);

  // Flag specified when registering a TabletPeer.
  enum RegisterTabletPeerMode {
    NEW_PEER,
    REPLACEMENT_PEER
  };

  // Each tablet report is assigned a sequence number, so that subsequent
  // tablet reports only need to re-report those tablets which have
  // changed since the last report. Each tablet tracks the sequence
  // number at which it became dirty.
  struct TabletReportState {
    uint32_t change_seq;
  };
  typedef std::unordered_map<std::string, TabletReportState> DirtyMap;

  // Returns Status::OK() iff state_ == MANAGER_RUNNING.
  CHECKED_STATUS CheckRunningUnlocked(boost::optional<TabletServerErrorPB::Code>* error_code) const;

  // Registers the start of a tablet state transition by inserting the tablet
  // id and reason string into the transition_in_progress_ map.
  // 'reason' is a string included in the Status return when there is
  // contention indicating why the tablet is currently already transitioning.
  // Returns IllegalState if the tablet is already "locked" for a state
  // transition by some other operation.
  // On success, returns OK and populates 'deleter' with an object that removes
  // the map entry on destruction.
  CHECKED_STATUS StartTabletStateTransitionUnlocked(const std::string& tablet_id,
                                            const std::string& reason,
                                            scoped_refptr<TransitionInProgressDeleter>* deleter);

  // Open a tablet meta from the local file system by loading its superblock.
  CHECKED_STATUS OpenTabletMeta(const std::string& tablet_id,
                        scoped_refptr<tablet::TabletMetadata>* metadata);

  // Open a tablet whose metadata has already been loaded/created.
  // This method does not return anything as it can be run asynchronously.
  // Upon completion of this method the tablet should be initialized and running.
  // If something wrong happened on bootstrap/initialization the relevant error
  // will be set on TabletPeer along with the state set to FAILED.
  //
  // The tablet must be registered and an entry corresponding to this tablet
  // must be put into the transition_in_progress_ map before calling this
  // method. A TransitionInProgressDeleter must be passed as 'deleter' into
  // this method in order to remove that transition-in-progress entry when
  // opening the tablet is complete (in either a success or a failure case).
  void OpenTablet(const scoped_refptr<tablet::TabletMetadata>& meta,
                  const scoped_refptr<TransitionInProgressDeleter>& deleter);

  // Open a tablet whose metadata has already been loaded.
  void BootstrapAndInitTablet(const scoped_refptr<tablet::TabletMetadata>& meta,
                              scoped_refptr<tablet::TabletPeer>* peer);

  // Add the tablet to the tablet map.
  // 'mode' specifies whether to expect an existing tablet to exist in the map.
  // If mode == NEW_PEER but a tablet with the same name is already registered,
  // or if mode == REPLACEMENT_PEER but a tablet with the same name is not
  // registered, a FATAL message is logged, causing a process crash.
  // Calls to this method are expected to be externally synchronized, typically
  // using the transition_in_progress_ map.
  void RegisterTablet(const std::string& tablet_id,
                      const scoped_refptr<tablet::TabletPeer>& tablet_peer,
                      RegisterTabletPeerMode mode);

  // Create and register a new TabletPeer, given tablet metadata.
  // Calls RegisterTablet() with the given 'mode' parameter after constructing
  // the TablerPeer object. See RegisterTablet() for details about the
  // semantics of 'mode' and the locking requirements.
  scoped_refptr<tablet::TabletPeer> CreateAndRegisterTabletPeer(
      const scoped_refptr<tablet::TabletMetadata>& meta,
      RegisterTabletPeerMode mode);

  // Helper to generate the report for a single tablet.
  void CreateReportedTabletPB(const std::string& tablet_id,
                              const scoped_refptr<tablet::TabletPeer>& tablet_peer,
                              master::ReportedTabletPB* reported_tablet);

  // Mark that the provided TabletPeer's state has changed. That should be taken into
  // account in the next report.
  //
  // NOTE: requires that the caller holds the lock.
  void MarkDirtyUnlocked(const std::string& tablet_id,
                         std::shared_ptr<consensus::StateChangeContext> context);

  // Handle the case on startup where we find a tablet that is not in
  // TABLET_DATA_READY state. Generally, we tombstone the replica.
  CHECKED_STATUS HandleNonReadyTabletOnStartup(const scoped_refptr<tablet::TabletMetadata>& meta);

  // Return the tablet with oldest write still in its memstore
  scoped_refptr<tablet::TabletPeer> TabletToFlush();

  TSTabletManagerStatePB state() const {
    boost::shared_lock<rw_spinlock> lock(lock_);
    return state_;
  }

  // Initializes the RaftPeerPB for the local peer.
  // Guaranteed to include both uuid and last_seen_addr fields.
  // Crashes with an invariant check if the RPC server is not currently in a
  // running state.
  void InitLocalRaftPeerPB();

  FsManager* const fs_manager_;

  TabletServer* server_;

  consensus::RaftPeerPB local_peer_pb_;

  typedef std::unordered_map<std::string, scoped_refptr<tablet::TabletPeer>> TabletMap;
  // This is a map that takes a table id and maps it to a map of directory and
  // set of tablets using that directory.
  typedef std::unordered_map<std::string,
                             std::unordered_map<std::string, std::unordered_set<std::string>>>
    TableDiskAssignmentMap;

  // Lock protecting tablet_map_, dirty_tablets_, state_, and
  // transition_in_progress_.
  mutable rw_spinlock lock_;

  // Map from tablet ID to tablet
  TabletMap tablet_map_;

  // Map from table ID to count of children in data and wal directories.
  TableDiskAssignmentMap table_data_assignment_map_;
  TableDiskAssignmentMap table_wal_assignment_map_;
  mutable Mutex dir_assignment_lock_;

  // Map of tablet ids -> reason strings where the keys are tablets whose
  // bootstrap, creation, or deletion is in-progress
  TransitionInProgressMap transition_in_progress_;

  // Tablets to include in the next incremental tablet report.
  // When a tablet is added/removed/added locally and needs to be
  // reported to the master, an entry is added to this map.
  DirtyMap dirty_tablets_;

  // Next tablet report seqno.
  int32_t next_report_seq_;

  MetricRegistry* metric_registry_;

  TSTabletManagerStatePB state_;

  // Thread pool used to open the tablets async, whether bootstrap is required or not.
  std::unique_ptr<ThreadPool> open_tablet_pool_;

  // Thread pool for preparing transactions, shared between all tablets.
  std::unique_ptr<ThreadPool> tablet_prepare_pool_;

  // Thread pool for apply transactions, shared between all tablets.
  std::unique_ptr<ThreadPool> apply_pool_;

  // Thread pool for Raft-related operations, shared between all tablets.
  std::unique_ptr<ThreadPool> raft_pool_;

  // Thread pool for read ops, that are run in parallel, shared between all tablets.
  std::unique_ptr<ThreadPool> read_pool_;

  // Used for scheduling flushes
  std::unique_ptr<BackgroundTask> background_task_;

  // For block cache and memory monitor shared across tablets
  tablet::TabletOptions tablet_options_;

  yb::client::AsyncClientInitialiser async_client_init_;

  DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
};

// Helper to delete the transition-in-progress entry from the corresponding set
// when tablet boostrap, create, and delete operations complete.
class TransitionInProgressDeleter : public RefCountedThreadSafe<TransitionInProgressDeleter> {
 public:
  TransitionInProgressDeleter(TransitionInProgressMap* map, rw_spinlock* lock,
                              string entry);

 private:
  friend class RefCountedThreadSafe<TransitionInProgressDeleter>;
  ~TransitionInProgressDeleter();

  TransitionInProgressMap* const in_progress_;
  rw_spinlock* const lock_;
  const std::string entry_;
};

// Print a log message using the given info and tombstone the specified tablet.
// If tombstoning the tablet fails, a FATAL error is logged, resulting in a crash.
// If ts_manager pointer is passed in, it will unregister from the directory assignment map.
void LogAndTombstone(const scoped_refptr<tablet::TabletMetadata>& meta,
                     const std::string& msg,
                     const std::string& uuid,
                     const Status& s,
                     TSTabletManager* ts_manager = nullptr);

// Delete the tablet using the specified delete_type as the final metadata
// state. Deletes the on-disk data, as well as all WAL segments.
// If ts_manager pointer is passed in, it will unregister from the directory assignment map.
Status DeleteTabletData(const scoped_refptr<tablet::TabletMetadata>& meta,
                        tablet::TabletDataState delete_type,
                        const std::string& uuid,
                        const yb::OpId& last_logged_opid,
                        TSTabletManager* ts_manager = nullptr);

// Return Status::IllegalState if leader_term < last_logged_term.
// Helper function for use with remote bootstrap.
Status CheckLeaderTermNotLower(const std::string& tablet_id,
                               const std::string& uuid,
                               int64_t leader_term,
                               int64_t last_logged_term);

// Helper function to replace a stale tablet found from earlier failed tries.
Status HandleReplacingStaleTablet(scoped_refptr<tablet::TabletMetadata> meta,
                                  scoped_refptr<tablet::TabletPeer> old_tablet_peer,
                                  const std::string& tablet_id,
                                  const std::string& uuid,
                                  const int64_t& leader_term);

} // namespace tserver
} // namespace yb
#endif /* YB_TSERVER_TS_TABLET_MANAGER_H */
